// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package task

import (
	"bytes"
	"context"
	"encoding/binary"
	"fmt"
	"math"
	"net/http"
	"slices"
	"strings"
	"sync"
	"time"

	"github.com/docker/go-units"
	"github.com/fatih/color"
	"github.com/opentracing/opentracing-go"
	"github.com/pingcap/errors"
	"github.com/pingcap/failpoint"
	backuppb "github.com/pingcap/kvproto/pkg/brpb"
	"github.com/pingcap/log"
	"github.com/pingcap/tidb/br/pkg/backup"
	"github.com/pingcap/tidb/br/pkg/checkpoint"
	"github.com/pingcap/tidb/br/pkg/conn"
	"github.com/pingcap/tidb/br/pkg/encryption"
	berrors "github.com/pingcap/tidb/br/pkg/errors"
	"github.com/pingcap/tidb/br/pkg/glue"
	"github.com/pingcap/tidb/br/pkg/httputil"
	"github.com/pingcap/tidb/br/pkg/logutil"
	"github.com/pingcap/tidb/br/pkg/metautil"
	"github.com/pingcap/tidb/br/pkg/restore"
	"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
	logclient "github.com/pingcap/tidb/br/pkg/restore/log_client"
	"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
	restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
	"github.com/pingcap/tidb/br/pkg/storage"
	"github.com/pingcap/tidb/br/pkg/stream"
	"github.com/pingcap/tidb/br/pkg/streamhelper"
	advancercfg "github.com/pingcap/tidb/br/pkg/streamhelper/config"
	"github.com/pingcap/tidb/br/pkg/streamhelper/daemon"
	"github.com/pingcap/tidb/br/pkg/summary"
	"github.com/pingcap/tidb/br/pkg/utils"
	"github.com/pingcap/tidb/pkg/kv"
	"github.com/pingcap/tidb/pkg/meta/model"
	"github.com/pingcap/tidb/pkg/util/cdcutil"
	"github.com/spf13/pflag"
	"github.com/tikv/client-go/v2/oracle"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.uber.org/zap"
)

const (
	flagYes                = "yes"
	flagCleanUpCompactions = "clean-up-compactions"
	flagUntil              = "until"
	flagStreamJSONOutput   = "json"
	flagStreamTaskName     = "task-name"
	flagStreamStartTS      = "start-ts"
	flagStreamEndTS        = "end-ts"
	flagGCSafePointTTS     = "gc-ttl"

	truncateLockPath   = "truncating.lock"
	hintOnTruncateLock = "There might be another truncate task running, or a truncate task that didn't exit properly. " +
		"You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage."
)

const (
	waitInfoSchemaReloadCheckInterval = 1 * time.Second
	// a million tables should take a few minutes to load all DDL change, making 15 to make sure we don't exit early
	waitInfoSchemaReloadTimeout = 15 * time.Minute
)

var (
	StreamStart    = "log start"
	StreamStop     = "log stop"
	StreamPause    = "log pause"
	StreamResume   = "log resume"
	StreamStatus   = "log status"
	StreamTruncate = "log truncate"
	StreamMetadata = "log metadata"
	StreamCtl      = "log advancer"

	skipSummaryCommandList = map[string]struct{}{
		StreamStatus:   {},
		StreamTruncate: {},
	}

	streamShiftDuration = time.Hour
)

var StreamCommandMap = map[string]func(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error{
	StreamStart:    RunStreamStart,
	StreamStop:     RunStreamStop,
	StreamPause:    RunStreamPause,
	StreamResume:   RunStreamResume,
	StreamStatus:   RunStreamStatus,
	StreamTruncate: RunStreamTruncate,
	StreamMetadata: RunStreamMetadata,
	StreamCtl:      RunStreamAdvancer,
}

// StreamConfig specifies the configure about backup stream
type StreamConfig struct {
	Config

	TaskName string `json:"task-name" toml:"task-name"`

	// StartTS usually equals the tso of full-backup, but user can reset it
	StartTS uint64 `json:"start-ts" toml:"start-ts"`
	EndTS   uint64 `json:"end-ts" toml:"end-ts"`
	// SafePointTTL ensures TiKV can scan entries not being GC at [startTS, currentTS]
	SafePointTTL int64 `json:"safe-point-ttl" toml:"safe-point-ttl"`

	// Spec for the command `truncate`, we should truncate the until when?
	Until              uint64 `json:"until" toml:"until"`
	DryRun             bool   `json:"dry-run" toml:"dry-run"`
	SkipPrompt         bool   `json:"skip-prompt" toml:"skip-prompt"`
	CleanUpCompactions bool   `json:"clean-up-compactions" toml:"clean-up-compactions"`

	// Spec for the command `status`.
	JSONOutput bool `json:"json-output" toml:"json-output"`

	// Spec for the command `advancer`.
	AdvancerCfg advancercfg.Config `json:"advancer-config" toml:"advancer-config"`
}

func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStorage, error) {
	u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions)
	if err != nil {
		return nil, errors.Trace(err)
	}
	opts := getExternalStorageOptions(&cfg.Config, u)
	storage, err := storage.New(ctx, u, &opts)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return storage, nil
}

// DefineStreamStartFlags defines flags used for `stream start`
func DefineStreamStartFlags(flags *pflag.FlagSet) {
	DefineStreamCommonFlags(flags)

	flags.String(flagStreamStartTS, "",
		"usually equals last full backupTS, used for backup log. Default value is current ts.\n"+
			"support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23+0800'.")
	// 999999999999999999 means 2090-11-18 22:07:45
	flags.String(flagStreamEndTS, "999999999999999999", "end ts, indicate stopping observe after endTS"+
		"support TSO or datetime")
	_ = flags.MarkHidden(flagStreamEndTS)
	flags.Int64(flagGCSafePointTTS, utils.DefaultStreamStartSafePointTTL,
		"the TTL (in seconds) that PD holds for BR's GC safepoint")
	_ = flags.MarkHidden(flagGCSafePointTTS)
}

func DefineStreamPauseFlags(flags *pflag.FlagSet) {
	DefineStreamCommonFlags(flags)
	flags.Int64(flagGCSafePointTTS, utils.DefaultStreamPauseSafePointTTL,
		"the TTL (in seconds) that PD holds for BR's GC safepoint")
}

// DefineStreamCommonFlags define common flags for `stream task`
func DefineStreamCommonFlags(flags *pflag.FlagSet) {
	flags.String(flagStreamTaskName, "", "The task name for the backup log task.")
}

func DefineStreamStatusCommonFlags(flags *pflag.FlagSet) {
	flags.String(flagStreamTaskName, stream.WildCard,
		"The task name for backup stream log. If default, get status of all of tasks",
	)
	flags.Bool(flagStreamJSONOutput, false,
		"Print JSON as the output.",
	)
}

func DefineStreamTruncateLogFlags(flags *pflag.FlagSet) {
	flags.String(flagUntil, "", "Remove all backup data until this TS."+
		"(support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23+0800'.)")
	flags.Bool(flagDryRun, false, "Run the command but don't really delete the files.")
	flags.BoolP(flagYes, "y", false, "Skip all prompts and always execute the command.")
	flags.Bool(flagCleanUpCompactions, false, "Clean up compaction files. Including the compacted log files and expired SST files.")
}

func (cfg *StreamConfig) ParseStreamStatusFromFlags(flags *pflag.FlagSet) error {
	var err error
	cfg.JSONOutput, err = flags.GetBool(flagStreamJSONOutput)
	if err != nil {
		return errors.Trace(err)
	}

	if err = cfg.ParseStreamCommonFromFlags(flags); err != nil {
		return errors.Trace(err)
	}

	return nil
}

func (cfg *StreamConfig) ParseStreamTruncateFromFlags(flags *pflag.FlagSet) error {
	tsString, err := flags.GetString(flagUntil)
	if err != nil {
		return errors.Trace(err)
	}
	if cfg.Until, err = ParseTSString(tsString, true); err != nil {
		return errors.Trace(err)
	}
	if cfg.SkipPrompt, err = flags.GetBool(flagYes); err != nil {
		return errors.Trace(err)
	}
	if cfg.DryRun, err = flags.GetBool(flagDryRun); err != nil {
		return errors.Trace(err)
	}
	if cfg.CleanUpCompactions, err = flags.GetBool(flagCleanUpCompactions); err != nil {
		return errors.Trace(err)
	}
	return nil
}

// ParseStreamStartFromFlags parse parameters for `stream start`
func (cfg *StreamConfig) ParseStreamStartFromFlags(flags *pflag.FlagSet) error {
	err := cfg.ParseStreamCommonFromFlags(flags)
	if err != nil {
		return errors.Trace(err)
	}

	tsString, err := flags.GetString(flagStreamStartTS)
	if err != nil {
		return errors.Trace(err)
	}

	if cfg.StartTS, err = ParseTSString(tsString, true); err != nil {
		return errors.Trace(err)
	}

	tsString, err = flags.GetString(flagStreamEndTS)
	if err != nil {
		return errors.Trace(err)
	}

	if cfg.EndTS, err = ParseTSString(tsString, true); err != nil {
		return errors.Trace(err)
	}

	if cfg.SafePointTTL, err = flags.GetInt64(flagGCSafePointTTS); err != nil {
		return errors.Trace(err)
	}

	if cfg.SafePointTTL <= 0 {
		cfg.SafePointTTL = utils.DefaultStreamStartSafePointTTL
	}

	return nil
}

// ParseStreamPauseFromFlags parse parameters for `stream pause`
func (cfg *StreamConfig) ParseStreamPauseFromFlags(flags *pflag.FlagSet) error {
	err := cfg.ParseStreamCommonFromFlags(flags)
	if err != nil {
		return errors.Trace(err)
	}

	if cfg.SafePointTTL, err = flags.GetInt64(flagGCSafePointTTS); err != nil {
		return errors.Trace(err)
	}
	if cfg.SafePointTTL <= 0 {
		cfg.SafePointTTL = utils.DefaultStreamPauseSafePointTTL
	}
	return nil
}

// ParseStreamCommonFromFlags parse parameters for `stream task`
func (cfg *StreamConfig) ParseStreamCommonFromFlags(flags *pflag.FlagSet) error {
	var err error

	cfg.TaskName, err = flags.GetString(flagStreamTaskName)
	if err != nil {
		return errors.Trace(err)
	}

	if len(cfg.TaskName) <= 0 {
		return errors.Annotate(berrors.ErrInvalidArgument, "Miss parameters task-name")
	}
	return nil
}

type streamMgr struct {
	cfg     *StreamConfig
	mgr     *conn.Mgr
	bc      *backup.Client
	httpCli *http.Client
}

func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamStart bool) (*streamMgr, error) {
	mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
		cfg.CheckRequirements, false, conn.StreamVersionChecker)
	if err != nil {
		return nil, errors.Trace(err)
	}
	defer func() {
		if err != nil {
			mgr.Close()
		}
	}()

	// only stream start command needs Storage
	streamManager := &streamMgr{
		cfg: cfg,
		mgr: mgr,
	}
	if isStreamStart {
		client := backup.NewBackupClient(ctx, mgr)

		backend, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions)
		if err != nil {
			return nil, errors.Trace(err)
		}

		opts := storage.ExternalStorageOptions{
			NoCredentials:            cfg.NoCreds,
			SendCredentials:          cfg.SendCreds,
			CheckS3ObjectLockOptions: true,
		}
		if err = client.SetStorage(ctx, backend, &opts); err != nil {
			return nil, errors.Trace(err)
		}
		streamManager.bc = client

		// create http client to do some requirements check.
		streamManager.httpCli = httputil.NewClient(mgr.GetTLSConfig())
	}
	return streamManager, nil
}

func (s *streamMgr) close() {
	s.mgr.Close()
}

func (s *streamMgr) checkLock(ctx context.Context) (bool, error) {
	return s.bc.GetStorage().FileExists(ctx, metautil.LockFile)
}

func (s *streamMgr) setLock(ctx context.Context) error {
	return s.bc.SetLockFile(ctx)
}

// adjustAndCheckStartTS checks that startTS should be smaller than currentTS,
// and endTS is larger than currentTS.
func (s *streamMgr) adjustAndCheckStartTS(ctx context.Context) error {
	currentTS, err := s.mgr.GetCurrentTsFromPD(ctx)
	if err != nil {
		return errors.Trace(err)
	}
	// set currentTS to startTS as a default value
	if s.cfg.StartTS == 0 {
		s.cfg.StartTS = currentTS
	}

	if currentTS < s.cfg.StartTS {
		return errors.Annotatef(berrors.ErrInvalidArgument,
			"invalid timestamps, startTS %d should be smaller than currentTS %d",
			s.cfg.StartTS, currentTS)
	}
	if s.cfg.EndTS <= currentTS {
		return errors.Annotatef(berrors.ErrInvalidArgument,
			"invalid timestamps, endTS %d should be larger than currentTS %d",
			s.cfg.EndTS, currentTS)
	}

	return nil
}

// checkImportTaskRunning checks whether there is any import task running.
func (s *streamMgr) checkImportTaskRunning(ctx context.Context, etcdCLI *clientv3.Client) error {
	list, err := utils.GetImportTasksFrom(ctx, etcdCLI)
	if err != nil {
		return errors.Trace(err)
	}
	if !list.Empty() {
		return errors.Errorf("There are some lightning/restore tasks running: %s"+
			"please stop or wait finishing at first. "+
			"If the lightning/restore task is forced to terminate by system, "+
			"please wait for ttl to decrease to 0.", list.MessageToUser())
	}
	return nil
}

// setGCSafePoint sets the server safe point to PD.
func (s *streamMgr) setGCSafePoint(ctx context.Context, sp utils.BRServiceSafePoint) error {
	err := utils.CheckGCSafePoint(ctx, s.mgr.GetPDClient(), sp.BackupTS)
	if err != nil {
		return errors.Annotatef(err,
			"failed to check gc safePoint, ts %v", sp.BackupTS)
	}

	err = utils.UpdateServiceSafePoint(ctx, s.mgr.GetPDClient(), sp)
	if err != nil {
		return errors.Trace(err)
	}

	log.Info("set stream safePoint", zap.Object("safePoint", sp))
	return nil
}

func (s *streamMgr) buildObserveRanges() ([]kv.KeyRange, error) {
	dRanges, err := stream.BuildObserveDataRanges(
		s.mgr.GetStorage(),
		s.cfg.FilterStr,
		s.cfg.TableFilter,
		s.cfg.StartTS,
	)
	if err != nil {
		return nil, errors.Trace(err)
	}

	mRange := stream.BuildObserveMetaRange()
	rs := append([]kv.KeyRange{*mRange}, dRanges...)
	slices.SortFunc(rs, func(i, j kv.KeyRange) int {
		return bytes.Compare(i.StartKey, j.StartKey)
	})

	return rs, nil
}

func (s *streamMgr) backupFullSchemas(ctx context.Context) error {
	clusterVersion, err := s.mgr.GetClusterVersion(ctx)
	if err != nil {
		return errors.Trace(err)
	}

	metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, true, metautil.MetaFile, nil)
	metaWriter.Update(func(m *backuppb.BackupMeta) {
		// save log startTS to backupmeta file
		m.StartVersion = s.cfg.StartTS
		m.ClusterId = s.bc.GetClusterID()
		m.ClusterVersion = clusterVersion
	})

	if err = metaWriter.FlushBackupMeta(ctx); err != nil {
		return errors.Trace(err)
	}
	return nil
}

func (s *streamMgr) checkStreamStartEnable(ctx context.Context) error {
	supportStream, err := s.mgr.IsLogBackupEnabled(ctx, s.httpCli)
	if err != nil {
		return errors.Trace(err)
	}
	if !supportStream {
		return errors.New("Unable to create task about log-backup. " +
			"please set TiKV config `log-backup.enable` to true and restart TiKVs.")
	}

	return nil
}

type RestoreFunc func(string) error

// KeepGcDisabled keeps GC disabled and return a function that used to gc enabled.
// gc.ratio-threshold = "-1.0", which represents disable gc in TiKV.
func KeepGcDisabled(g glue.Glue, store kv.Storage) (RestoreFunc, string, error) {
	se, err := g.CreateSession(store)
	if err != nil {
		return nil, "", errors.Trace(err)
	}

	execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor()
	oldRatio, err := utils.GetGcRatio(execCtx)
	if err != nil {
		return nil, "", errors.Trace(err)
	}

	err = utils.SetGcRatio(execCtx, utils.DisabledGcRatioVal)
	if err != nil {
		return nil, "", errors.Trace(err)
	}

	return func(ratio string) error {
		return utils.SetGcRatio(execCtx, ratio)
	}, oldRatio, nil
}

// RunStreamCommand run all kinds of `stream task`
func RunStreamCommand(
	ctx context.Context,
	g glue.Glue,
	cmdName string,
	cfg *StreamConfig,
) error {
	cfg.Config.adjust()
	defer func() {
		if _, ok := skipSummaryCommandList[cmdName]; !ok {
			summary.Summary(cmdName)
		}
	}()
	commandFn, exist := StreamCommandMap[cmdName]
	if !exist {
		return errors.Annotatef(berrors.ErrInvalidArgument, "invalid command %s", cmdName)
	}

	if err := commandFn(ctx, g, cmdName, cfg); err != nil {
		log.Error("failed to run stream command", zap.String("command", cmdName), zap.Error(err))
		summary.SetSuccessStatus(false)
		summary.CollectFailureUnit(cmdName, err)
		return err
	}
	summary.SetSuccessStatus(true)
	return nil
}

// RunStreamStart specifies starting a stream task
func RunStreamStart(
	c context.Context,
	g glue.Glue,
	cmdName string,
	cfg *StreamConfig,
) error {
	ctx, cancelFn := context.WithCancel(c)
	defer cancelFn()

	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan("task.RunStreamStart", opentracing.ChildOf(span.Context()))
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)
	}

	streamMgr, err := NewStreamMgr(ctx, cfg, g, true)
	if err != nil {
		return errors.Trace(err)
	}
	defer streamMgr.close()

	if err = streamMgr.checkStreamStartEnable(ctx); err != nil {
		return errors.Trace(err)
	}
	if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil {
		return errors.Trace(err)
	}

	etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
	if err != nil {
		return errors.Trace(err)
	}
	cli := streamhelper.NewMetaDataClient(etcdCLI)
	defer func() {
		if closeErr := cli.Close(); closeErr != nil {
			log.Warn("failed to close etcd client", zap.Error(closeErr))
		}
	}()

	// check if any import/restore task is running, it's not allowed to start log backup
	// while restore is ongoing.
	if err = streamMgr.checkImportTaskRunning(ctx, cli.Client); err != nil {
		return errors.Trace(err)
	}

	// It supports single stream log task currently.
	if count, err := cli.GetTaskCount(ctx); err != nil {
		return errors.Trace(err)
	} else if count > 0 {
		return errors.Annotate(berrors.ErrStreamLogTaskExist, "failed to start the log backup, allow only one running task")
	}

	// make sure external file lock is available
	locked, err := streamMgr.checkLock(ctx)
	if err != nil {
		return errors.Trace(err)
	}

	// locked means this is a stream task restart. Or create a new stream task.
	if locked {
		logInfo, err := getLogRange(ctx, &cfg.Config)
		if err != nil {
			return errors.Trace(err)
		}
		if logInfo.clusterID > 0 && logInfo.clusterID != streamMgr.bc.GetClusterID() {
			return errors.Annotatef(berrors.ErrInvalidArgument,
				"the stream log files from cluster ID:%v and current cluster ID:%v ",
				logInfo.clusterID, streamMgr.bc.GetClusterID())
		}

		cfg.StartTS = logInfo.logMaxTS
		if err = streamMgr.setGCSafePoint(
			ctx,
			utils.BRServiceSafePoint{
				ID:       utils.MakeSafePointID(),
				TTL:      cfg.SafePointTTL,
				BackupTS: cfg.StartTS,
			},
		); err != nil {
			return errors.Trace(err)
		}
	} else {
		if err = streamMgr.setGCSafePoint(
			ctx,
			utils.BRServiceSafePoint{
				ID:       utils.MakeSafePointID(),
				TTL:      cfg.SafePointTTL,
				BackupTS: cfg.StartTS,
			},
		); err != nil {
			return errors.Trace(err)
		}
		if err = streamMgr.setLock(ctx); err != nil {
			return errors.Trace(err)
		}
		if err = streamMgr.backupFullSchemas(ctx); err != nil {
			return errors.Trace(err)
		}
	}

	ranges, err := streamMgr.buildObserveRanges()
	if err != nil {
		return errors.Trace(err)
	} else if len(ranges) == 0 {
		// nothing to backup
		pdAddress := strings.Join(cfg.PD, ",")
		log.Warn("Nothing to observe, maybe connected to cluster for restoring",
			zap.String("PD address", pdAddress))
		return errors.Annotate(berrors.ErrInvalidArgument, "nothing need to observe")
	}

	securityConfig := generateSecurityConfig(cfg)
	ti := streamhelper.TaskInfo{
		PBInfo: backuppb.StreamBackupTaskInfo{
			Storage:         streamMgr.bc.GetStorageBackend(),
			StartTs:         cfg.StartTS,
			EndTs:           cfg.EndTS,
			Name:            cfg.TaskName,
			TableFilter:     cfg.FilterStr,
			CompressionType: backuppb.CompressionType_ZSTD,
			SecurityConfig:  &securityConfig,
		},
		Ranges:  ranges,
		Pausing: false,
	}
	if err = cli.PutTask(ctx, ti); err != nil {
		return errors.Trace(err)
	}
	summary.Log(cmdName, ti.ZapTaskInfo()...)
	return nil
}

func generateSecurityConfig(cfg *StreamConfig) backuppb.StreamBackupTaskSecurityConfig {
	if len(cfg.LogBackupCipherInfo.CipherKey) > 0 && utils.IsEffectiveEncryptionMethod(cfg.LogBackupCipherInfo.CipherType) {
		return backuppb.StreamBackupTaskSecurityConfig{
			Encryption: &backuppb.StreamBackupTaskSecurityConfig_PlaintextDataKey{
				PlaintextDataKey: &backuppb.CipherInfo{
					CipherType: cfg.LogBackupCipherInfo.CipherType,
					CipherKey:  cfg.LogBackupCipherInfo.CipherKey,
				},
			},
		}
	}
	if len(cfg.MasterKeyConfig.MasterKeys) > 0 && utils.IsEffectiveEncryptionMethod(cfg.MasterKeyConfig.EncryptionType) {
		return backuppb.StreamBackupTaskSecurityConfig{
			Encryption: &backuppb.StreamBackupTaskSecurityConfig_MasterKeyConfig{
				MasterKeyConfig: &backuppb.MasterKeyConfig{
					EncryptionType: cfg.MasterKeyConfig.EncryptionType,
					MasterKeys:     cfg.MasterKeyConfig.MasterKeys,
				},
			},
		}
	}
	return backuppb.StreamBackupTaskSecurityConfig{}
}

func RunStreamMetadata(
	c context.Context,
	g glue.Glue,
	cmdName string,
	cfg *StreamConfig,
) error {
	ctx, cancelFn := context.WithCancel(c)
	defer cancelFn()

	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan(
			"task.RunStreamCheckLog",
			opentracing.ChildOf(span.Context()),
		)
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)
	}

	logInfo, err := getLogRange(ctx, &cfg.Config)
	if err != nil {
		return errors.Trace(err)
	}

	logMinDate := stream.FormatDate(oracle.GetTimeFromTS(logInfo.logMinTS))
	logMaxDate := stream.FormatDate(oracle.GetTimeFromTS(logInfo.logMaxTS))
	summary.Log(cmdName, zap.Uint64("log-min-ts", logInfo.logMinTS),
		zap.String("log-min-date", logMinDate),
		zap.Uint64("log-max-ts", logInfo.logMaxTS),
		zap.String("log-max-date", logMaxDate),
	)
	return nil
}

// RunStreamStop specifies stoping a stream task
func RunStreamStop(
	c context.Context,
	g glue.Glue,
	cmdName string,
	cfg *StreamConfig,
) error {
	ctx, cancelFn := context.WithCancel(c)
	defer cancelFn()

	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan(
			"task.RunStreamStop",
			opentracing.ChildOf(span.Context()),
		)
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)
	}

	streamMgr, err := NewStreamMgr(ctx, cfg, g, false)
	if err != nil {
		return errors.Trace(err)
	}
	defer streamMgr.close()

	etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
	if err != nil {
		return errors.Trace(err)
	}
	cli := streamhelper.NewMetaDataClient(etcdCLI)
	defer func() {
		if closeErr := cli.Close(); closeErr != nil {
			log.Warn("failed to close etcd client", zap.Error(closeErr))
		}
	}()
	// to add backoff
	ti, err := cli.GetTask(ctx, cfg.TaskName)
	if err != nil {
		return errors.Trace(err)
	}

	if err = cli.DeleteTask(ctx, cfg.TaskName); err != nil {
		return errors.Trace(err)
	}

	if err := streamMgr.setGCSafePoint(ctx,
		utils.BRServiceSafePoint{
			ID:       buildPauseSafePointName(ti.Info.Name),
			TTL:      0, // 0 means remove this service safe point.
			BackupTS: math.MaxUint64,
		},
	); err != nil {
		log.Warn("failed to remove safe point", zap.String("error", err.Error()))
	}

	summary.Log(cmdName, logutil.StreamBackupTaskInfo(&ti.Info))
	return nil
}

// RunStreamPause specifies pausing a stream task.
func RunStreamPause(
	c context.Context,
	g glue.Glue,
	cmdName string,
	cfg *StreamConfig,
) error {
	ctx, cancelFn := context.WithCancel(c)
	defer cancelFn()

	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan(
			"task.RunStreamPause",
			opentracing.ChildOf(span.Context()),
		)
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)
	}

	streamMgr, err := NewStreamMgr(ctx, cfg, g, false)
	if err != nil {
		return errors.Trace(err)
	}
	defer streamMgr.close()

	etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
	if err != nil {
		return errors.Trace(err)
	}
	cli := streamhelper.NewMetaDataClient(etcdCLI)
	defer func() {
		if closeErr := cli.Close(); closeErr != nil {
			log.Warn("failed to close etcd client", zap.Error(closeErr))
		}
	}()
	// to add backoff
	ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName)
	if err != nil {
		return errors.Trace(err)
	} else if isPaused {
		return errors.Annotatef(berrors.ErrKVUnknown, "The task %s is paused already.", cfg.TaskName)
	}

	globalCheckPointTS, err := ti.GetGlobalCheckPointTS(ctx)
	if err != nil {
		return errors.Trace(err)
	}
	if err = streamMgr.setGCSafePoint(
		ctx,
		utils.BRServiceSafePoint{
			ID:       buildPauseSafePointName(ti.Info.Name),
			TTL:      cfg.SafePointTTL,
			BackupTS: globalCheckPointTS,
		},
	); err != nil {
		return errors.Trace(err)
	}

	err = cli.PauseTask(ctx, cfg.TaskName)
	if err != nil {
		return errors.Trace(err)
	}

	summary.Log(cmdName, logutil.StreamBackupTaskInfo(&ti.Info))
	return nil
}

// RunStreamResume specifies resuming a stream task.
func RunStreamResume(
	c context.Context,
	g glue.Glue,
	cmdName string,
	cfg *StreamConfig,
) error {
	ctx, cancelFn := context.WithCancel(c)
	defer cancelFn()

	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan(
			"task.RunStreamResume",
			opentracing.ChildOf(span.Context()),
		)
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)
	}

	streamMgr, err := NewStreamMgr(ctx, cfg, g, false)
	if err != nil {
		return errors.Trace(err)
	}
	defer streamMgr.close()

	etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
	if err != nil {
		return errors.Trace(err)
	}
	cli := streamhelper.NewMetaDataClient(etcdCLI)
	defer func() {
		if closeErr := cli.Close(); closeErr != nil {
			log.Warn("failed to close etcd client", zap.Error(closeErr))
		}
	}()
	// to add backoff
	ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName)
	if err != nil {
		return errors.Trace(err)
	} else if !isPaused {
		return errors.Annotatef(berrors.ErrKVUnknown,
			"The task %s is active already.", cfg.TaskName)
	}

	globalCheckPointTS, err := ti.GetGlobalCheckPointTS(ctx)
	if err != nil {
		return errors.Trace(err)
	}
	err = utils.CheckGCSafePoint(ctx, streamMgr.mgr.GetPDClient(), globalCheckPointTS)
	if err != nil {
		return errors.Annotatef(err, "the global checkpoint ts: %v(%s) has been gc. ",
			globalCheckPointTS, oracle.GetTimeFromTS(globalCheckPointTS))
	}

	err = cli.ResumeTask(ctx, cfg.TaskName)
	if err != nil {
		return errors.Trace(err)
	}

	err = cli.CleanLastErrorOfTask(ctx, cfg.TaskName)
	if err != nil {
		return err
	}

	if err := streamMgr.setGCSafePoint(ctx,
		utils.BRServiceSafePoint{
			ID:       buildPauseSafePointName(ti.Info.Name),
			TTL:      utils.DefaultStreamStartSafePointTTL,
			BackupTS: globalCheckPointTS,
		},
	); err != nil {
		log.Warn("failed to remove safe point",
			zap.Uint64("safe-point", globalCheckPointTS), zap.String("error", err.Error()))
	}

	summary.Log(cmdName, logutil.StreamBackupTaskInfo(&ti.Info))
	return nil
}

func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error {
	ctx, cancel := context.WithCancel(c)
	defer cancel()
	log.Info("starting", zap.String("cmd", cmdName))

	mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
		cfg.CheckRequirements, false, conn.StreamVersionChecker)
	if err != nil {
		return err
	}

	etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
	if err != nil {
		return err
	}
	env := streamhelper.CliEnv(mgr.StoreManager, mgr.GetStore(), etcdCLI)
	advancer := streamhelper.NewCheckpointAdvancer(env)
	advancer.UpdateConfig(cfg.AdvancerCfg)
	ownerMgr := streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI)
	defer func() {
		ownerMgr.Close()
	}()
	advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration)
	loop, err := advancerd.Begin(ctx)
	if err != nil {
		return err
	}
	if cfg.AdvancerCfg.OwnershipCycleInterval > 0 {
		err = advancerd.ForceToBeOwner(ctx)
		if err != nil {
			return err
		}
		log.Info("command line advancer forced to be the owner")
		go runOwnershipCycle(ctx, advancerd, cfg.AdvancerCfg.OwnershipCycleInterval, true)
	}
	loop()
	return nil
}

// runOwnershipCycle handles the periodic cycling of ownership for the advancer
func runOwnershipCycle(ctx context.Context, advancerd *daemon.OwnerDaemon, cycleDuration time.Duration, isOwner bool) {
	ticker := time.NewTicker(cycleDuration)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			if !isOwner {
				// try to become owner
				if err := advancerd.ForceToBeOwner(ctx); err != nil {
					log.Error("command line advancer failed to force ownership", zap.Error(err))
					continue
				}
				log.Info("command line advancer forced to be the owner")
				isOwner = true
			} else {
				// retire from being owner
				advancerd.RetireIfOwner()
				log.Info("command line advancer retired from being owner")
				isOwner = false
			}
		}
	}
}

func checkConfigForStatus(pd []string) error {
	if len(pd) == 0 {
		return errors.Annotatef(berrors.ErrInvalidArgument,
			"the command needs access to PD, please specify `-u` or `--pd`")
	}

	return nil
}

// makeStatusController makes the status controller via some config.
// this should better be in the `stream` package but it is impossible because of cyclic requirements.
func makeStatusController(ctx context.Context, cfg *StreamConfig, g glue.Glue) (*stream.StatusController, error) {
	console := glue.GetConsole(g)
	etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
	if err != nil {
		return nil, err
	}
	cli := streamhelper.NewMetaDataClient(etcdCLI)
	var printer stream.TaskPrinter
	if !cfg.JSONOutput {
		printer = stream.PrintTaskByTable(console)
	} else {
		printer = stream.PrintTaskWithJSON(console)
	}
	mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
		cfg.CheckRequirements, false, conn.StreamVersionChecker)
	if err != nil {
		return nil, err
	}
	return stream.NewStatusController(cli, mgr, printer), nil
}

// RunStreamStatus get status for a specific stream task
func RunStreamStatus(
	c context.Context,
	g glue.Glue,
	cmdName string,
	cfg *StreamConfig,
) error {
	ctx, cancelFn := context.WithCancel(c)
	defer cancelFn()

	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan(
			"task.RunStreamStatus",
			opentracing.ChildOf(span.Context()),
		)
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)
	}

	if err := checkConfigForStatus(cfg.PD); err != nil {
		return err
	}
	ctl, err := makeStatusController(ctx, cfg, g)
	if err != nil {
		return err
	}

	defer func() {
		if closeErr := ctl.Close(); closeErr != nil {
			log.Warn("failed to close etcd client", zap.Error(closeErr))
		}
	}()
	return ctl.PrintStatusOfTask(ctx, cfg.TaskName)
}

// RunStreamTruncate truncates the log that belong to (0, until-ts)
func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) (err error) {
	console := glue.GetConsole(g)
	em := color.New(color.Bold).SprintFunc()
	warn := color.New(color.Bold, color.FgHiRed).SprintFunc()
	formatTS := func(ts uint64) string {
		return oracle.GetTimeFromTS(ts).Format("2006-01-02 15:04:05.0000")
	}
	if cfg.Until == 0 {
		return errors.Annotatef(berrors.ErrInvalidArgument, "please provide the `--until` ts")
	}

	ctx, cancelFn := context.WithCancel(c)
	defer cancelFn()

	extStorage, err := cfg.makeStorage(ctx)
	if err != nil {
		return err
	}
	lock, err := storage.TryLockRemote(ctx, extStorage, truncateLockPath, hintOnTruncateLock)
	if err != nil {
		return err
	}
	defer utils.WithCleanUp(&err, 10*time.Second, func(ctx context.Context) error {
		return lock.Unlock(ctx)
	})

	sp, err := stream.GetTSFromFile(ctx, extStorage, stream.TruncateSafePointFileName)
	if err != nil {
		return err
	}

	if cfg.Until < sp {
		console.Println("According to the log, you have truncated log backup data before", em(formatTS(sp)))
		if !cfg.SkipPrompt && !console.PromptBool("Continue? ") {
			return nil
		}
	}

	if cfg.CleanUpCompactions {
		est := stream.MigerationExtension(extStorage)
		est.Hooks = stream.NewProgressBarHooks(console)
		newSN := math.MaxInt
		optPrompt := stream.MMOptInteractiveCheck(func(ctx context.Context, m *backuppb.Migration) bool {
			console.Println("We are going to do the following: ")
			tbl := console.CreateTable()
			stream.AddMigrationToTable(m, tbl)
			tbl.Print()
			return console.PromptBool("Continue? ")
		})
		optAppend := stream.MMOptAppendPhantomMigration(backuppb.Migration{TruncatedTo: cfg.Until})
		opts := []stream.MergeAndMigrateToOpt{optPrompt, optAppend, stream.MMOptAlwaysRunTruncate()}
		var res stream.MergeAndMigratedTo
		if cfg.DryRun {
			est.DryRun(func(me stream.MigrationExt) {
				res = me.MergeAndMigrateTo(ctx, newSN, opts...)
			})
		} else {
			res = est.MergeAndMigrateTo(ctx, newSN, opts...)
		}
		if len(res.Warnings) > 0 {
			glue.PrintList(console, "the following errors happened", res.Warnings, 10)
		}
		return nil
	}

	readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost())
	metas := stream.StreamMetadataSet{
		MetadataDownloadBatchSize: cfg.MetadataDownloadBatchSize,
		Helper:                    stream.NewMetadataHelper(),
		DryRun:                    cfg.DryRun,
	}
	shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, extStorage, cfg.Until)
	if err != nil {
		return err
	}
	readMetaDone()

	var (
		fileCount int    = 0
		kvCount   int64  = 0
		totalSize uint64 = 0
	)

	metas.IterateFilesFullyBefore(shiftUntilTS, func(d *stream.FileGroupInfo) (shouldBreak bool) {
		fileCount++
		totalSize += d.Length
		kvCount += d.KVCount
		return
	})
	console.Printf("We are going to truncate %s files, up to TS %s.\n",
		em(fileCount),
		em(formatTS(cfg.Until)),
	)
	if !cfg.SkipPrompt && !console.PromptBool(warn("Are you sure?")) {
		return nil
	}

	if cfg.Until > sp && !cfg.DryRun {
		if err := stream.SetTSToFile(
			ctx, extStorage, cfg.Until, stream.TruncateSafePointFileName); err != nil {
			return err
		}
	}

	// begin to remove
	p := console.StartProgressBar(
		"Truncating Data Files and Metadata", fileCount,
		glue.WithTimeCost(),
		glue.WithConstExtraField("kv-count", kvCount),
		glue.WithConstExtraField("kv-size", fmt.Sprintf("%d(%s)", totalSize, units.HumanSize(float64(totalSize)))),
	)
	defer p.Close()

	notDeleted, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, extStorage, p.IncBy)
	if err != nil {
		return err
	}

	if err := p.Wait(ctx); err != nil {
		return err
	}

	if len(notDeleted) > 0 {
		const keepFirstNFailure = 16
		console.Println("Files below are not deleted due to error, you may clear it manually, check log for detail error:")
		console.Println("- Total", em(len(notDeleted)), "items.")
		if len(notDeleted) > keepFirstNFailure {
			console.Println("-", em(len(notDeleted)-keepFirstNFailure), "items omitted.")
			// TODO: maybe don't add them at the very first.
			notDeleted = notDeleted[:keepFirstNFailure]
		}
		for _, f := range notDeleted {
			console.Println(f)
		}
	}

	return nil
}

// checkTaskExists checks whether there is a log backup task running.
// If so, return an error.
func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3.Client) error {
	if err := checkConfigForStatus(cfg.PD); err != nil {
		return err
	}

	cli := streamhelper.NewMetaDataClient(etcdCLI)
	// check log backup task
	tasks, err := cli.GetAllTasks(ctx)
	if err != nil {
		return err
	}
	if len(tasks) > 0 {
		return errors.Errorf("log backup task is running: %s, "+
			"please stop the task before restore, and after PITR operation finished, "+
			"create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name)
	}

	return nil
}

func checkIncompatibleChangefeed(ctx context.Context, backupTS uint64, etcdCLI *clientv3.Client) error {
	nameSet, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(ctx, etcdCLI, backupTS)
	if err != nil {
		return err
	}
	if !nameSet.Empty() {
		return errors.Errorf("%splease remove changefeed(s) before restore", nameSet.MessageToUser())
	}
	return nil
}

// RunStreamRestore restores stream log.
func RunStreamRestore(
	c context.Context,
	mgr *conn.Mgr,
	g glue.Glue,
	cfg *RestoreConfig,
) (err error) {
	ctx, cancelFn := context.WithCancel(c)
	defer cancelFn()

	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan("task.RunStreamRestore", opentracing.ChildOf(span.Context()))
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)
	}
	_, s, err := GetStorage(ctx, cfg.Config.Storage, &cfg.Config)
	if err != nil {
		return errors.Trace(err)
	}
	logInfo, err := getLogRangeWithStorage(ctx, s)
	if err != nil {
		return errors.Trace(err)
	}

	// if not set by user, restore to the max TS available
	if cfg.RestoreTS == 0 {
		cfg.RestoreTS = logInfo.logMaxTS
	}
	cfg.upstreamClusterID = logInfo.clusterID

	if len(cfg.FullBackupStorage) > 0 {
		startTS, fullClusterID, err := getFullBackupTS(ctx, cfg)
		if err != nil {
			return errors.Trace(err)
		}
		if logInfo.clusterID > 0 && fullClusterID > 0 && logInfo.clusterID != fullClusterID {
			return errors.Annotatef(berrors.ErrInvalidArgument,
				"the full snapshot(from cluster ID:%v) and log(from cluster ID:%v) come from different cluster.",
				fullClusterID, logInfo.clusterID)
		}

		cfg.StartTS = startTS
		if cfg.StartTS < logInfo.logMinTS {
			return errors.Annotatef(berrors.ErrInvalidArgument,
				"it has gap between full backup ts:%d(%s) and log backup ts:%d(%s). ",
				cfg.StartTS, oracle.GetTimeFromTS(cfg.StartTS),
				logInfo.logMinTS, oracle.GetTimeFromTS(logInfo.logMinTS))
		}
	}

	log.Info("start point in time restore",
		zap.Uint64("restore-from", cfg.StartTS), zap.Uint64("restore-to", cfg.RestoreTS),
		zap.Uint64("log-min-ts", logInfo.logMinTS), zap.Uint64("log-max-ts", logInfo.logMaxTS))
	if err := checkLogRange(cfg.StartTS, cfg.RestoreTS, logInfo.logMinTS, logInfo.logMaxTS); err != nil {
		return errors.Trace(err)
	}

	checkInfo, err := checkPiTRTaskInfo(ctx, mgr, g, cfg)
	if err != nil {
		return errors.Trace(err)
	}

	failpoint.Inject("failed-before-full-restore", func(_ failpoint.Value) {
		failpoint.Return(errors.New("failpoint: failed before full restore"))
	})

	recorder := tiflashrec.New()
	cfg.tiflashRecorder = recorder
	// restore full snapshot.
	if checkInfo.NeedFullRestore {
		logStorage := cfg.Config.Storage
		cfg.Config.Storage = cfg.FullBackupStorage
		// TiFlash replica is restored to down-stream on 'pitr' currently.
		if err = runSnapshotRestore(ctx, mgr, g, FullRestoreCmd, cfg, checkInfo); err != nil {
			return errors.Trace(err)
		}
		cfg.Config.Storage = logStorage
	} else if len(cfg.FullBackupStorage) > 0 {
		skipMsg := []byte(fmt.Sprintf("%s command is skipped due to checkpoint mode for restore\n", FullRestoreCmd))
		if _, err := glue.GetConsole(g).Out().Write(skipMsg); err != nil {
			return errors.Trace(err)
		}
		if checkInfo.CheckpointInfo != nil && checkInfo.CheckpointInfo.Metadata != nil && checkInfo.CheckpointInfo.Metadata.TiFlashItems != nil {
			log.Info("load tiflash records of snapshot restore from checkpoint")
			cfg.tiflashRecorder.Load(checkInfo.CheckpointInfo.Metadata.TiFlashItems)
		}
	}
	// restore log.
	cfg.adjustRestoreConfigForStreamRestore()
	if err := restoreStream(ctx, mgr, g, cfg, checkInfo.CheckpointInfo); err != nil {
		return errors.Trace(err)
	}
	return nil
}

// RunStreamRestore start restore job
func restoreStream(
	c context.Context,
	mgr *conn.Mgr,
	g glue.Glue,
	cfg *RestoreConfig,
	taskInfo *checkpoint.CheckpointTaskInfoForLogRestore,
) (err error) {
	var (
		totalKVCount           uint64
		totalSize              uint64
		checkpointTotalKVCount uint64
		checkpointTotalSize    uint64
		currentTS              uint64
		mu                     sync.Mutex
		startTime              = time.Now()
	)
	defer func() {
		if err != nil {
			summary.Log("restore log failed summary", zap.Error(err))
		} else {
			totalDureTime := time.Since(startTime)
			summary.Log("restore log success summary", zap.Duration("total-take", totalDureTime),
				zap.Uint64("source-start-point", cfg.StartTS),
				zap.Uint64("source-end-point", cfg.RestoreTS),
				zap.Uint64("target-end-point", currentTS),
				zap.String("source-start", stream.FormatDate(oracle.GetTimeFromTS(cfg.StartTS))),
				zap.String("source-end", stream.FormatDate(oracle.GetTimeFromTS(cfg.RestoreTS))),
				zap.String("target-end", stream.FormatDate(oracle.GetTimeFromTS(currentTS))),
				zap.Uint64("total-kv-count", totalKVCount),
				zap.Uint64("skipped-kv-count-by-checkpoint", checkpointTotalKVCount),
				zap.String("total-size", units.HumanSize(float64(totalSize))),
				zap.String("skipped-size-by-checkpoint", units.HumanSize(float64(checkpointTotalSize))),
				zap.String("average-speed", units.HumanSize(float64(totalSize)/totalDureTime.Seconds())+"/s"),
			)
		}
	}()

	ctx, cancelFn := context.WithCancel(c)
	defer cancelFn()

	restoreCfg := tweakLocalConfForRestore()
	defer restoreCfg()

	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan(
			"restoreStream",
			opentracing.ChildOf(span.Context()),
		)
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)
	}

	client, err := createRestoreClient(ctx, g, cfg, mgr)
	if err != nil {
		return errors.Annotate(err, "failed to create restore client")
	}
	defer client.Close(ctx)

	if taskInfo != nil && taskInfo.Metadata != nil {
		// reuse the task's rewrite ts
		log.Info("reuse the task's rewrite ts", zap.Uint64("rewrite-ts", taskInfo.Metadata.RewriteTS))
		currentTS = taskInfo.Metadata.RewriteTS
	} else {
		currentTS, err = restore.GetTSWithRetry(ctx, mgr.GetPDClient())
		if err != nil {
			return errors.Trace(err)
		}
	}
	if err := client.SetCurrentTS(currentTS); err != nil {
		return errors.Trace(err)
	}

	importModeSwitcher := restore.NewImportModeSwitcher(mgr.GetPDClient(), cfg.Config.SwitchModeInterval, mgr.GetTLSConfig())
	restoreSchedulers, _, err := restore.RestorePreWork(ctx, mgr, importModeSwitcher, cfg.Online, false)
	if err != nil {
		return errors.Trace(err)
	}
	// Always run the post-work even on error, so we don't stuck in the import
	// mode or emptied schedulers
	defer restore.RestorePostWork(ctx, importModeSwitcher, restoreSchedulers, cfg.Online)

	// It need disable GC in TiKV when PiTR.
	// because the process of PITR is concurrent and kv events isn't sorted by tso.
	restoreGc, oldRatio, err := KeepGcDisabled(g, mgr.GetStorage())
	if err != nil {
		return errors.Trace(err)
	}
	gcDisabledRestorable := false
	defer func() {
		// don't restore the gc-ratio-threshold if checkpoint mode is used and restored is not finished
		if cfg.UseCheckpoint && !gcDisabledRestorable {
			log.Info("skip restore the gc-ratio-threshold for next retry")
			return
		}

		// If the oldRatio is negative, which is not normal status.
		// It should set default value "1.1" after PiTR finished.
		if strings.HasPrefix(oldRatio, "-") {
			log.Warn("the original gc-ratio is negative, reset by default value 1.1", zap.String("old-gc-ratio", oldRatio))
			oldRatio = utils.DefaultGcRatioVal
		}
		log.Info("start to restore gc", zap.String("ratio", oldRatio))
		if err := restoreGc(oldRatio); err != nil {
			log.Error("failed to set gc enabled", zap.Error(err))
		}
		log.Info("finish restoring gc")
	}()

	var sstCheckpointSets map[string]struct{}
	if cfg.UseCheckpoint {
		oldRatioFromCheckpoint, err := client.InitCheckpointMetadataForLogRestore(ctx, cfg.StartTS, cfg.RestoreTS, oldRatio, cfg.tiflashRecorder)
		if err != nil {
			return errors.Trace(err)
		}
		oldRatio = oldRatioFromCheckpoint
		sstCheckpointSets, err = client.InitCheckpointMetadataForCompactedSstRestore(ctx)
		if err != nil {
			return errors.Trace(err)
		}
	}
	encryptionManager, err := encryption.NewManager(&cfg.LogBackupCipherInfo, &cfg.MasterKeyConfig)
	if err != nil {
		return errors.Annotate(err, "failed to create encryption manager for log restore")
	}
	defer encryptionManager.Close()
	err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize, encryptionManager)
	if err != nil {
		return err
	}
	migs, err := client.GetMigrations(ctx)
	if err != nil {
		return errors.Trace(err)
	}
	client.BuildMigrations(migs)

	// get full backup meta storage to generate rewrite rules.
	fullBackupStorage, err := parseFullBackupTablesStorage(cfg)
	if err != nil {
		return errors.Trace(err)
	}
	// load the id maps only when the checkpoint mode is used and not the first execution
	currentIdMapSaved := false
	if taskInfo != nil && taskInfo.Progress == checkpoint.InLogRestoreAndIdMapPersist {
		currentIdMapSaved = true
	}

	ddlFiles, err := client.LoadDDLFilesAndCountDMLFiles(ctx)
	if err != nil {
		return err
	}

	// get the schemas ID replace information.
	// since targeted full backup storage, need to use the full backup cipher
	tableMappingManager, err := client.BuildTableMappingManager(ctx, &logclient.BuildTableMappingManagerConfig{
		CurrentIdMapSaved: currentIdMapSaved,
		TableFilter:       cfg.TableFilter,
		FullBackupStorage: fullBackupStorage,
		CipherInfo:        &cfg.Config.CipherInfo,
		Files:             ddlFiles,
	})
	if err != nil {
		return errors.Trace(err)
	}

	schemasReplace := stream.NewSchemasReplace(tableMappingManager.DbReplaceMap, cfg.tiflashRecorder,
		client.CurrentTS(), cfg.TableFilter, client.RecordDeleteRange)
	schemasReplace.AfterTableRewritten = func(deleted bool, tableInfo *model.TableInfo) {
		// When the table replica changed to 0, the tiflash replica might be set to `nil`.
		// We should remove the table if we meet.
		if deleted || tableInfo.TiFlashReplica == nil {
			cfg.tiflashRecorder.DelTable(tableInfo.ID)
			return
		}
		cfg.tiflashRecorder.AddTable(tableInfo.ID, *tableInfo.TiFlashReplica)
		// Remove the replica firstly. Let's restore them at the end.
		tableInfo.TiFlashReplica = nil
	}

	updateStats := func(kvCount uint64, size uint64) {
		mu.Lock()
		defer mu.Unlock()
		totalKVCount += kvCount
		totalSize += size
	}

	pm := g.StartProgress(ctx, "Restore Meta Files", int64(len(ddlFiles)), !cfg.LogProgress)
	if err = withProgress(pm, func(p glue.Progress) error {
		client.RunGCRowsLoader(ctx)
		return client.RestoreAndRewriteMetaKVFiles(ctx, ddlFiles, schemasReplace, updateStats, p.Inc)
	}); err != nil {
		return errors.Annotate(err, "failed to restore meta files")
	}

	rewriteRules := initRewriteRules(schemasReplace)

	ingestRecorder := schemasReplace.GetIngestRecorder()
	if err := rangeFilterFromIngestRecorder(ingestRecorder, rewriteRules); err != nil {
		return errors.Trace(err)
	}

	logFilesIter, err := client.LoadDMLFiles(ctx)
	if err != nil {
		return errors.Trace(err)
	}

	compactionIter := client.LogFileManager.GetCompactionIter(ctx)

	se, err := g.CreateSession(mgr.GetStorage())
	if err != nil {
		return errors.Trace(err)
	}
	execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor()
	splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx)
	log.Info("[Log Restore] get split threshold from tikv config", zap.Uint64("split-size", splitSize), zap.Int64("split-keys", splitKeys))

	pd := g.StartProgress(ctx, "Restore Files(SST + KV)", logclient.TotalEntryCount, !cfg.LogProgress)
	err = withProgress(pd, func(p glue.Progress) (pErr error) {
		updateStatsWithCheckpoint := func(kvCount, size uint64) {
			mu.Lock()
			defer mu.Unlock()
			totalKVCount += kvCount
			totalSize += size
			checkpointTotalKVCount += kvCount
			checkpointTotalSize += size
			// increase the progress
			p.IncBy(int64(kvCount))
		}
		compactedSplitIter, err := client.WrapCompactedFilesIterWithSplitHelper(
			ctx, compactionIter, rewriteRules, sstCheckpointSets,
			updateStatsWithCheckpoint, splitSize, splitKeys,
		)
		if err != nil {
			return errors.Trace(err)
		}

		err = client.RestoreCompactedSstFiles(ctx, compactedSplitIter, rewriteRules, importModeSwitcher, p.IncBy)
		if err != nil {
			return errors.Trace(err)
		}

		logFilesIterWithSplit, err := client.WrapLogFilesIterWithSplitHelper(ctx, logFilesIter, execCtx, rewriteRules, updateStatsWithCheckpoint, splitSize, splitKeys)
		if err != nil {
			return errors.Trace(err)
		}

		if cfg.UseCheckpoint {
			// TODO make a failpoint iter inside the logclient.
			failpoint.Inject("corrupt-files", func(v failpoint.Value) {
				var retErr error
				logFilesIterWithSplit, retErr = logclient.WrapLogFilesIterWithCheckpointFailpoint(v, logFilesIterWithSplit, rewriteRules)
				defer func() { pErr = retErr }()
			})
		}

		return client.RestoreKVFiles(ctx, rewriteRules, logFilesIterWithSplit,
			cfg.PitrBatchCount, cfg.PitrBatchSize, updateStats, p.IncBy, &cfg.LogBackupCipherInfo, cfg.MasterKeyConfig.MasterKeys)
	})
	if err != nil {
		return errors.Annotate(err, "failed to restore kv files")
	}

	// failpoint to stop for a while after restoring kvs
	// this is to mimic the scenario that restore takes long time and the lease in schemaInfo has expired and needs refresh
	failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) {
		if val.(bool) {
			// not ideal to use sleep but not sure what's the better way right now
			log.Info("sleep after restoring kv")
			time.Sleep(2 * time.Second)
		}
	})

	// make sure schema reload finishes before proceeding
	if err = waitUntilSchemaReload(ctx, client); err != nil {
		return errors.Trace(err)
	}

	if err = client.CleanUpKVFiles(ctx); err != nil {
		return errors.Annotate(err, "failed to clean up")
	}

	if err = client.InsertGCRows(ctx); err != nil {
		return errors.Annotate(err, "failed to insert rows into gc_delete_range")
	}

	if err = client.RepairIngestIndex(ctx, ingestRecorder, g); err != nil {
		return errors.Annotate(err, "failed to repair ingest index")
	}

	if cfg.tiflashRecorder != nil {
		sqls := cfg.tiflashRecorder.GenerateAlterTableDDLs(mgr.GetDomain().InfoSchema())
		log.Info("Generating SQLs for restoring TiFlash Replica",
			zap.Strings("sqls", sqls))
		err = g.UseOneShotSession(mgr.GetStorage(), false, func(se glue.Session) error {
			for _, sql := range sqls {
				if errExec := se.ExecuteInternal(ctx, sql); errExec != nil {
					logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.",
						logutil.ShortError(errExec),
						zap.String("sql", sql),
					)
				}
			}
			return nil
		})
		if err != nil {
			return err
		}
	}

	failpoint.Inject("do-checksum-with-rewrite-rules", func(_ failpoint.Value) {
		if err := client.FailpointDoChecksumForLogRestore(ctx, mgr.GetStorage().GetClient(), mgr.GetPDClient(), rewriteRules); err != nil {
			failpoint.Return(errors.Annotate(err, "failed to do checksum"))
		}
	})

	gcDisabledRestorable = true

	return nil
}

func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *conn.Mgr) (*logclient.LogClient, error) {
	var err error
	keepaliveCfg := GetKeepalive(&cfg.Config)
	keepaliveCfg.PermitWithoutStream = true
	client := logclient.NewRestoreClient(mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), keepaliveCfg)

	err = client.Init(ctx, g, mgr.GetStorage())
	if err != nil {
		return nil, errors.Trace(err)
	}
	defer func() {
		if err != nil {
			client.Close(ctx)
		}
	}()

	u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions)
	if err != nil {
		return nil, errors.Trace(err)
	}

	opts := getExternalStorageOptions(&cfg.Config, u)
	if err = client.SetStorage(ctx, u, &opts); err != nil {
		return nil, errors.Trace(err)
	}
	client.SetCrypter(&cfg.CipherInfo)
	client.SetUpstreamClusterID(cfg.upstreamClusterID)

	createCheckpointSessionFn := func() (glue.Session, error) {
		// always create a new session for checkpoint runner
		// because session is not thread safe
		if cfg.UseCheckpoint {
			return g.CreateSession(mgr.GetStorage())
		}
		return nil, nil
	}
	err = client.InitClients(ctx, u, createCheckpointSessionFn, uint(cfg.Concurrency), cfg.ConcurrencyPerStore.Value)
	if err != nil {
		return nil, errors.Trace(err)
	}

	err = client.SetRawKVBatchClient(ctx, cfg.PD, cfg.TLS.ToKVSecurity())
	if err != nil {
		return nil, errors.Trace(err)
	}

	return client, nil
}

// rangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder
// TODO: need to implement the range filter out feature
func rangeFilterFromIngestRecorder(recorder *ingestrec.IngestRecorder, rewriteRules map[int64]*restoreutils.RewriteRules) error {
	err := recorder.RewriteTableID(func(tableID int64) (int64, bool, error) {
		rewriteRule, exists := rewriteRules[tableID]
		if !exists {
			// since the table's files will be skipped restoring, here also skips.
			return 0, true, nil
		}
		newTableID := restoreutils.GetRewriteTableID(tableID, rewriteRule)
		if newTableID == 0 {
			return 0, false, errors.Errorf("newTableID is 0, tableID: %d", tableID)
		}
		return newTableID, false, nil
	})
	return errors.Trace(err)
}

func getExternalStorageOptions(cfg *Config, u *backuppb.StorageBackend) storage.ExternalStorageOptions {
	var httpClient *http.Client
	if u.GetGcs() == nil {
		httpClient = storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize)
	}
	return storage.ExternalStorageOptions{
		NoCredentials:   cfg.NoCreds,
		SendCredentials: cfg.SendCreds,
		HTTPClient:      httpClient,
	}
}

func checkLogRange(restoreFromTS, restoreToTS, logMinTS, logMaxTS uint64) error {
	// several ts constraint：
	// logMinTS <= restoreFromTS <= restoreToTS <= logMaxTS
	if logMinTS > restoreFromTS || restoreFromTS > restoreToTS || restoreToTS > logMaxTS {
		return errors.Annotatef(berrors.ErrInvalidArgument,
			"restore log from %d(%s) to %d(%s), "+
				" but the current existed log from %d(%s) to %d(%s)",
			restoreFromTS, oracle.GetTimeFromTS(restoreFromTS),
			restoreToTS, oracle.GetTimeFromTS(restoreToTS),
			logMinTS, oracle.GetTimeFromTS(logMinTS),
			logMaxTS, oracle.GetTimeFromTS(logMaxTS),
		)
	}
	return nil
}

// withProgress execute some logic with the progress, and close it once the execution done.
func withProgress(p glue.Progress, cc func(p glue.Progress) error) error {
	defer p.Close()
	return cc(p)
}

type backupLogInfo struct {
	logMaxTS  uint64
	logMinTS  uint64
	clusterID uint64
}

// getLogRange gets the log-min-ts and log-max-ts of starting log backup.
func getLogRange(
	ctx context.Context,
	cfg *Config,
) (backupLogInfo, error) {
	_, s, err := GetStorage(ctx, cfg.Storage, cfg)
	if err != nil {
		return backupLogInfo{}, errors.Trace(err)
	}
	return getLogRangeWithStorage(ctx, s)
}

func getLogRangeWithStorage(
	ctx context.Context,
	s storage.ExternalStorage,
) (backupLogInfo, error) {
	// logStartTS: Get log start ts from backupmeta file.
	metaData, err := s.ReadFile(ctx, metautil.MetaFile)
	if err != nil {
		return backupLogInfo{}, errors.Trace(err)
	}
	backupMeta := &backuppb.BackupMeta{}
	if err = backupMeta.Unmarshal(metaData); err != nil {
		return backupLogInfo{}, errors.Trace(err)
	}
	// endVersion > 0 represents that the storage has been used for `br backup`
	if backupMeta.GetEndVersion() > 0 {
		return backupLogInfo{}, errors.Annotate(berrors.ErrStorageUnknown,
			"the storage has been used for full backup")
	}
	logStartTS := backupMeta.GetStartVersion()

	// truncateTS: get log truncate ts from TruncateSafePointFileName.
	// If truncateTS equals 0, which represents the stream log has never been truncated.
	truncateTS, err := stream.GetTSFromFile(ctx, s, stream.TruncateSafePointFileName)
	if err != nil {
		return backupLogInfo{}, errors.Trace(err)
	}
	logMinTS := max(logStartTS, truncateTS)

	// get max global resolved ts from metas.
	logMaxTS, err := getGlobalCheckpointFromStorage(ctx, s)
	if err != nil {
		return backupLogInfo{}, errors.Trace(err)
	}
	logMaxTS = max(logMinTS, logMaxTS)

	return backupLogInfo{
		logMaxTS:  logMaxTS,
		logMinTS:  logMinTS,
		clusterID: backupMeta.ClusterId,
	}, nil
}

func getGlobalCheckpointFromStorage(ctx context.Context, s storage.ExternalStorage) (uint64, error) {
	var globalCheckPointTS uint64 = 0
	opt := storage.WalkOption{SubDir: stream.GetStreamBackupGlobalCheckpointPrefix()}
	err := s.WalkDir(ctx, &opt, func(path string, size int64) error {
		if !strings.HasSuffix(path, ".ts") {
			return nil
		}

		buff, err := s.ReadFile(ctx, path)
		if err != nil {
			return errors.Trace(err)
		}
		ts := binary.LittleEndian.Uint64(buff)
		globalCheckPointTS = max(ts, globalCheckPointTS)
		return nil
	})
	return globalCheckPointTS, errors.Trace(err)
}

// getFullBackupTS gets the snapshot-ts of full backup
func getFullBackupTS(
	ctx context.Context,
	cfg *RestoreConfig,
) (uint64, uint64, error) {
	_, s, err := GetStorage(ctx, cfg.FullBackupStorage, &cfg.Config)
	if err != nil {
		return 0, 0, errors.Trace(err)
	}

	metaData, err := s.ReadFile(ctx, metautil.MetaFile)
	if err != nil {
		return 0, 0, errors.Trace(err)
	}

	decryptedMetaData, err := metautil.DecryptFullBackupMetaIfNeeded(metaData, &cfg.CipherInfo)
	if err != nil {
		return 0, 0, errors.Trace(err)
	}

	backupmeta := &backuppb.BackupMeta{}
	if err = backupmeta.Unmarshal(decryptedMetaData); err != nil {
		return 0, 0, errors.Trace(err)
	}

	// start and end are identical in full backup, pick random one
	return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil
}

func parseFullBackupTablesStorage(
	cfg *RestoreConfig,
) (*logclient.FullBackupStorageConfig, error) {
	if len(cfg.FullBackupStorage) == 0 {
		log.Info("the full backup path is not specified, so BR will try to get id maps")
		return nil, nil
	}
	u, err := storage.ParseBackend(cfg.FullBackupStorage, &cfg.BackendOptions)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return &logclient.FullBackupStorageConfig{
		Backend: u,
		Opts:    storageOpts(&cfg.Config),
	}, nil
}

func initRewriteRules(schemasReplace *stream.SchemasReplace) map[int64]*restoreutils.RewriteRules {
	rules := make(map[int64]*restoreutils.RewriteRules)
	filter := schemasReplace.TableFilter

	for _, dbReplace := range schemasReplace.DbMap {
		if utils.IsSysDB(dbReplace.Name) || !filter.MatchSchema(dbReplace.Name) {
			continue
		}

		for oldTableID, tableReplace := range dbReplace.TableMap {
			if !filter.MatchTable(dbReplace.Name, tableReplace.Name) {
				continue
			}

			if _, exist := rules[oldTableID]; !exist {
				log.Info("add rewrite rule",
					zap.String("tableName", dbReplace.Name+"."+tableReplace.Name),
					zap.Int64("oldID", oldTableID), zap.Int64("newID", tableReplace.TableID))
				rules[oldTableID] = restoreutils.GetRewriteRuleOfTable(
					oldTableID, tableReplace.TableID, 0, tableReplace.IndexMap, false)
			}

			for oldID, newID := range tableReplace.PartitionMap {
				if _, exist := rules[oldID]; !exist {
					log.Info("add rewrite rule",
						zap.String("tableName", dbReplace.Name+"."+tableReplace.Name),
						zap.Int64("oldID", oldID), zap.Int64("newID", newID))
					rules[oldID] = restoreutils.GetRewriteRuleOfTable(oldID, newID, 0, tableReplace.IndexMap, false)
				}
			}
		}
	}
	return rules
}

// ShiftTS gets a smaller shiftTS than startTS.
// It has a safe duration between shiftTS and startTS for trasaction.
func ShiftTS(startTS uint64) uint64 {
	physical := oracle.ExtractPhysical(startTS)
	logical := oracle.ExtractLogical(startTS)

	shiftPhysical := physical - streamShiftDuration.Milliseconds()
	if shiftPhysical < 0 {
		return 0
	}
	return oracle.ComposeTS(shiftPhysical, logical)
}

func buildPauseSafePointName(taskName string) string {
	return fmt.Sprintf("%s_pause_safepoint", taskName)
}

func checkPiTRRequirements(mgr *conn.Mgr) error {
	return restore.AssertUserDBsEmpty(mgr.GetDomain())
}

type PiTRTaskInfo struct {
	CheckpointInfo      *checkpoint.CheckpointTaskInfoForLogRestore
	NeedFullRestore     bool
	FullRestoreCheckErr error
}

func checkPiTRTaskInfo(
	ctx context.Context,
	mgr *conn.Mgr,
	g glue.Glue,
	cfg *RestoreConfig,
) (*PiTRTaskInfo, error) {
	var (
		doFullRestore = len(cfg.FullBackupStorage) > 0
		curTaskInfo   *checkpoint.CheckpointTaskInfoForLogRestore
	)
	checkInfo := &PiTRTaskInfo{}

	if cfg.UseCheckpoint {
		se, err := g.CreateSession(mgr.GetStorage())
		if err != nil {
			return nil, errors.Trace(err)
		}

		execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor()
		curTaskInfo, err = checkpoint.TryToGetCheckpointTaskInfo(ctx, mgr.GetDomain(), execCtx)
		if err != nil {
			return checkInfo, errors.Trace(err)
		}
		// the log restore checkpoint metadata is persist, so the PITR is in the log restore stage.
		if curTaskInfo.Metadata != nil {
			// TODO: check whether user has manually modified the cluster(ddl). If so, regard the behavior
			//       as restore from scratch. (update `curTaskInfo.RewriteTs` to 0 as an uninitial value)

			if curTaskInfo.Metadata.UpstreamClusterID != cfg.upstreamClusterID {
				return checkInfo, errors.Errorf(
					"The upstream cluster id[%d] of the current log restore does not match that[%d] recorded in checkpoint. "+
						"Perhaps you should specify the last log backup storage instead, "+
						"or just clean the checkpoint database[%s] if the cluster has been cleaned up.",
					cfg.upstreamClusterID, curTaskInfo.Metadata.UpstreamClusterID, checkpoint.LogRestoreCheckpointDatabaseName)
			}

			if curTaskInfo.Metadata.StartTS != cfg.StartTS || curTaskInfo.Metadata.RestoredTS != cfg.RestoreTS {
				return checkInfo, errors.Errorf(
					"The current log restore want to restore cluster from %d to %d, "+
						"which is different from that from %d to %d recorded in checkpoint. "+
						"Perhaps you should specify the last full backup storage to match the start-ts and "+
						"the parameter --restored-ts to match the restored-ts. "+
						"or just clean the checkpoint database[%s] if the cluster has been cleaned up.",
					cfg.StartTS, cfg.RestoreTS, curTaskInfo.Metadata.StartTS, curTaskInfo.Metadata.RestoredTS, checkpoint.LogRestoreCheckpointDatabaseName,
				)
			}

			log.Info("detect log restore checkpoint. so skip snapshot restore and start log restore from the checkpoint")
			// the same task, skip full restore because it is already in the log restore stage.
			doFullRestore = false
		}
	}
	checkInfo.CheckpointInfo = curTaskInfo
	checkInfo.NeedFullRestore = doFullRestore
	// restore full snapshot precheck.
	if doFullRestore {
		if !(cfg.UseCheckpoint && (curTaskInfo.Metadata != nil || curTaskInfo.HasSnapshotMetadata)) {
			// Only when use checkpoint and not the first execution,
			// skip checking requirements.
			log.Info("check pitr requirements for the first execution")
			if err := checkPiTRRequirements(mgr); err != nil {
				// delay cluster checks after we get the backupmeta.
				// for the case that the restore inc + log backup,
				// we can still restore them.
				checkInfo.FullRestoreCheckErr = err
				return checkInfo, nil
			}
		}
	}

	return checkInfo, nil
}

func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) error {
	log.Info("waiting for schema info finishes reloading")
	reloadStart := time.Now()
	conditionFunc := func() bool {
		return !client.GetDomain().IsLeaseExpired()
	}
	if err := utils.WaitUntil(ctx, conditionFunc, waitInfoSchemaReloadCheckInterval, waitInfoSchemaReloadTimeout); err != nil {
		return errors.Annotate(err, "failed to wait until schema reload")
	}
	log.Info("reloading schema finished", zap.Duration("timeTaken", time.Since(reloadStart)))
	return nil
}
